Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33466][Connectors/Kafka]: Bounded Kafka source never finishes after restore from savepoint #71

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jwtryg
Copy link

@jwtryg jwtryg commented Dec 7, 2023

This fix snapshots the noMoreNewPartitionSplits variable. This is done to make sure that the signalNoMoreSplits signal is sent after restoring from a snapshot.

At the current state, a bounded kafka source will never finish after restoring from a snapshot state, because a NoMoreSplits signal is never sent to the readers.

I'm aware that I have not yet been assigned to a corresponding issue, but I am yet to hear anything from the community (I have already tried submitting an issue and raising the problem here). So, please let me know if I should go another way, or if you disagree with the approach - then we can work out something else together :)

I have created a test case using the testcontainers framework and the Flink MiniCluster that showcases the problem - the test runs forever before the fix, but successfully finishes after the fix:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.time.Duration;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import com.tryg.reconciler.util.KafkaContainer;
import static java.util.Map.entry;

public class BoundedKafkaSavepointIntegrationTests {
    private static final int NUM_PARTITIONS = 1;
    private static final Random random = new Random(StepDefinitions.class.hashCode());
    
    StreamExecutionEnvironment env;
    KafkaContainer kafkaContainer;
    Properties kafkaProperties;

    Configuration checkpointConfig;
    String lastCheckpointPath;
    String checkpointFolder;

    String inputTopicName;
    String outputTopicName;
    List<String> inputTopic;
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
    new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
        .setNumberSlotsPerTaskManager(1)
        .setNumberTaskManagers(1)
        .build());
        
    @Rule 
    public TemporaryFolder folder = new TemporaryFolder();

    @Before
    public void setup() throws Exception {
        // Setup checkpoints
        folder.create();
        checkpointFolder = folder.newFolder().getAbsolutePath();

        // Create StreamExecutionEnvironment
        Configuration conf = new Configuration();
        conf.setString("state.checkpoints.dir", "file://" + checkpointFolder);
        env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // configure test environment
        env.setParallelism(NUM_PARTITIONS);

        // Start kafka container
        kafkaContainer = new KafkaContainer(null);
        kafkaContainer.start();

        // Create topics
        inputTopicName = "input-topic-" + random.nextInt(Integer.MAX_VALUE);
        outputTopicName = "output-topic-" + random.nextInt(Integer.MAX_VALUE);
        kafkaContainer.createTopics(NUM_PARTITIONS, inputTopicName, outputTopicName);
        
        // Create kafka properties
        kafkaProperties = new Properties();
            kafkaProperties.putAll(Map.ofEntries(
                entry(ConsumerConfig.GROUP_ID_CONFIG, "GROUP-ID-" + random.nextInt(Integer.MAX_VALUE)),
                entry("bootstrap.servers", kafkaContainer.getBootstrapServers()),
                entry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
            ));

        checkpointConfig = new Configuration();
    }

    @After
    public void cleanUp() {
        kafkaContainer.stop();
        folder.delete();
    }

    @Test
    public void tests() throws Exception {
        inputTopic = List.of("1", "2", "3");
        produceDataToInputTopic(inputTopic);
        process(true, false);
        produceDataToInputTopic(List.of("4", "5", "6", "7", "8"));
        process(false, true);
    }

    private void produceDataToInputTopic(List<String> msgs) {
        KafkaRecordSerializationSchema<String> serializationSchema = 
            KafkaRecordSerializationSchema.builder()
            .setTopic(inputTopicName)
            .setValueSerializationSchema(new SimpleStringSchema())
            .build();

        List<ProducerRecord<byte[], byte[]>> records = msgs.stream()
            .map(m -> serializationSchema.serialize(m, null, null))
            .collect(Collectors.toList());

        pushToKafka(records);
    }

    private void pushToKafka(List<ProducerRecord<byte[], byte[]>> records) {
        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer())) {
            records.forEach(record -> {
                producer.send(record);
            });
            producer.flush();
        }
    }
    
    private void process(boolean createCheckpoint, boolean fromCheckpoint) throws Exception {
        env.getCheckpointConfig().configure(checkpointConfig);
        TopicPartition tp = new TopicPartition(inputTopicName, 0);

        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBounded(OffsetsInitializer.offsets(Map.of(tp, (long) inputTopic.size() + 1)))
            .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(new SimpleStringSchema()))
            .setTopics(inputTopicName)
            .setProperties(kafkaProperties)
            .build();

        DataStream<String> stream = env
            .fromSource(source, WatermarkStrategy.noWatermarks(), inputTopicName)
            .name(inputTopicName)
            .uid(inputTopicName);

        stream
            .keyBy(ignored -> "sameKeyAlways")
            .addSink(new CollectSink());

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        if (fromCheckpoint) {
            jobGraph.setSavepointRestoreSettings(
                SavepointRestoreSettings.forPath(lastCheckpointPath));
                System.out.print("Resuming from savepoint " + lastCheckpointPath + "\n");
                System.out.flush();
        }

        // Submit job
        flinkCluster.before();
        ClusterClient<?> client = flinkCluster.getClusterClient();
        JobID jobId = client.submitJob(jobGraph).get();

        while (true) {
            JobStatus status = client.getJobStatus(jobId).get();
            System.out.print("Status: " + status + ", all sunked values: " + CollectSink.values + "\n");
            System.out.flush();

            if (createCheckpoint && status.equals(JobStatus.RUNNING)) {
                    lastCheckpointPath = client.stopWithSavepoint(jobId, false, checkpointFolder, SavepointFormatType.CANONICAL).get();
                    System.out.print("Stopping with savepoint\n");
                    System.out.flush();
                    break;
            } 
            
            if (status.equals(JobStatus.FINISHED)) {
                break;
            }

            Thread.sleep(5000);
        }
    }

    private static class CollectSink implements SinkFunction<String> {
        public static final List<String> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(String value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}

Copy link

boring-cyborg bot commented Dec 7, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@jwtryg jwtryg changed the title [FLINK-33466][Connectors/Kafka]: snapshot the noMoreNewPartitionSplits variable [FLINK-33466][Connectors/Kafka]: Bounded Kafka source never finishes after restore from savepoint Dec 7, 2023
@MartijnVisser
Copy link
Contributor

@jwtryg Can you please rebase your PR to trigger CI?

@MartijnVisser
Copy link
Contributor

@mas-chen Do you also want to take a look at this one?

Copy link
Contributor

@mas-chen mas-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix generally makes sense to me. We need to resend the signal in case of job failure and restart. Can you please also add coverage for this case? I think you had a test in your PR description that could be adapted to the existing IT tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants